% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
%   http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_db).

-export([
    create/2,
    open/2,
    open_int/2,
    incref/1,
    reopen/1,
    close/1,
    exists/1,

    clustered_db/2,
    clustered_db/3,

    monitor/1,
    monitored_by/1,
    is_idle/1,

    is_admin/1,
    check_is_admin/1,
    check_is_member/1,

    name/1,
    get_after_doc_read_fun/1,
    get_before_doc_update_fun/1,
    get_committed_update_seq/1,
    get_compacted_seq/1,
    get_compactor_pid/1,
    get_compactor_pid_sync/1,
    get_db_info/1,
    get_partition_info/2,
    get_del_doc_count/1,
    get_doc_count/1,
    get_epochs/1,
    get_filepath/1,
    get_instance_start_time/1,
    get_pid/1,
    get_revs_limit/1,
    get_security/1,
    get_update_seq/1,
    get_user_ctx/1,
    get_uuid/1,
    get_purge_seq/1,
    get_oldest_purge_seq/1,
    get_purge_infos_limit/1,

    is_db/1,
    is_system_db/1,
    is_clustered/1,
    is_system_db_name/1,
    is_partitioned/1,

    set_revs_limit/2,
    set_purge_infos_limit/2,
    set_security/2,
    set_user_ctx/2,

    load_validation_funs/1,
    reload_validation_funs/1,

    open_doc/2,
    open_doc/3,
    open_doc_revs/3,
    open_doc_revs/4,
    open_doc_int/3,
    get_doc_info/2,
    get_full_doc_info/2,
    get_full_doc_infos/2,
    get_missing_revs/2,
    get_design_doc/2,
    get_design_docs/1,
    get_design_doc_count/1,
    get_purge_infos/2,

    get_minimum_purge_seq/1,
    purge_client_exists/3,

    validate_docid/2,
    doc_from_json_obj_validate/2,

    update_doc/3,
    update_doc/4,
    update_docs/4,
    update_docs/2,
    update_docs/3,
    delete_doc/3,

    purge_docs/2,
    purge_docs/3,

    with_stream/3,
    open_write_stream/2,
    open_read_stream/2,
    is_active_stream/2,

    fold_docs/3,
    fold_docs/4,
    fold_local_docs/4,
    fold_design_docs/4,
    fold_changes/4,
    fold_changes/5,
    count_changes_since/2,
    fold_purge_infos/4,
    fold_purge_infos/5,

    calculate_start_seq/3,
    owner_of/2,

    start_compact/1,
    cancel_compact/1,
    wait_for_compaction/1,
    wait_for_compaction/2,
    is_compacting/1,

    dbname_suffix/1,
    normalize_dbname/1,
    validate_dbname/1,

    make_doc/5,
    new_revid/1
]).

-export([
    start_link/4
]).

-include_lib("couch/include/couch_db.hrl").
-include("couch_db_int.hrl").

-define(DBNAME_REGEX,
    % use the stock CouchDB regex
    "^[a-z][a-z0-9\\_\\$()\\+\\-\\/]*"
    % but allow an optional shard timestamp at the end
    "(\\.[0-9]{10,})?$"
).
-define(DEFAULT_COMPRESSIBLE_TYPES,
    "text/*, application/javascript, application/json, application/xml"
).
% Purge client max lag window in seconds (defaulting to 24 hours)
-define(PURGE_LAG_SEC, 86400).

start_link(Engine, DbName, Filepath, Options) ->
    Arg = {Engine, DbName, Filepath, Options},
    proc_lib:start_link(couch_db_updater, init, [Arg]).

create(DbName, Options) ->
    couch_server:create(DbName, Options).

% this is for opening a database for internal purposes like the replicator
% or the view indexer. it never throws a reader error.
open_int(DbName, Options) ->
    couch_server:open(DbName, Options).

% this should be called anytime an http request opens the database.
% it ensures that the http userCtx is a valid reader
open(DbName, Options) ->
    case couch_server:open(DbName, Options) of
        {ok, Db} ->
            try
                check_is_member(Db),
                {ok, Db}
            catch
                throw:Error ->
                    close(Db),
                    throw(Error)
            end;
        Else ->
            Else
    end.

reopen(#db{} = Db) ->
    % We could have just swapped out the storage engine
    % for this database during a compaction so we just
    % reimplement this as a close/open pair now.
    try
        open(Db#db.name, [{user_ctx, Db#db.user_ctx} | Db#db.options])
    after
        close(Db)
    end.

% You shouldn't call this. Its part of the ref counting between
% couch_server and couch_db instances.
incref(#db{} = Db) ->
    couch_db_engine:incref(Db).

clustered_db(DbName, Options) when is_list(Options) ->
    UserCtx = couch_util:get_value(user_ctx, Options, #user_ctx{}),
    SecProps = couch_util:get_value(security, Options, []),
    Props = couch_util:get_value(props, Options, []),
    {ok, #db{
        name = DbName,
        user_ctx = UserCtx,
        security = SecProps,
        options = [{props, Props}]
    }};
clustered_db(DbName, #user_ctx{} = UserCtx) ->
    clustered_db(DbName, [{user_ctx, UserCtx}]).

clustered_db(DbName, UserCtx, SecProps) ->
    clustered_db(DbName, [{user_ctx, UserCtx}, {security, SecProps}]).

is_db(#db{}) ->
    true;
is_db(_) ->
    false.

is_system_db(#db{options = Options}) ->
    lists:member(sys_db, Options).

is_clustered(#db{main_pid = nil}) ->
    true;
is_clustered(#db{}) ->
    false;
is_clustered(?OLD_DB_REC = Db) ->
    ?OLD_DB_MAIN_PID(Db) == undefined.

is_partitioned(#db{options = Options}) ->
    Props = couch_util:get_value(props, Options, []),
    couch_util:get_value(partitioned, Props, false).

close(#db{} = Db) ->
    ok = couch_db_engine:decref(Db);
close(?OLD_DB_REC) ->
    ok.

exists(DbName) ->
    couch_server:exists(DbName).

is_idle(#db{compactor_pid = nil} = Db) ->
    monitored_by(Db) == [];
is_idle(_Db) ->
    false.

monitored_by(Db) ->
    case couch_db_engine:monitored_by(Db) of
        Pids when is_list(Pids) ->
            PidTracker = whereis(couch_stats_process_tracker),
            Pids -- [Db#db.main_pid, PidTracker];
        undefined ->
            []
    end.

monitor(#db{main_pid = MainPid}) ->
    erlang:monitor(process, MainPid).

start_compact(#db{} = Db) ->
    gen_server:call(Db#db.main_pid, start_compact).

cancel_compact(#db{main_pid = Pid}) ->
    gen_server:call(Pid, cancel_compact).

wait_for_compaction(Db) ->
    wait_for_compaction(Db, infinity).

wait_for_compaction(#db{main_pid = Pid} = Db, Timeout) ->
    Start = os:timestamp(),
    case gen_server:call(Pid, compactor_pid) of
        CPid when is_pid(CPid) ->
            Ref = erlang:monitor(process, CPid),
            receive
                {'DOWN', Ref, _, _, normal} when Timeout == infinity ->
                    wait_for_compaction(Db, Timeout);
                {'DOWN', Ref, _, _, normal} ->
                    Elapsed = timer:now_diff(os:timestamp(), Start) div 1000,
                    wait_for_compaction(Db, Timeout - Elapsed);
                {'DOWN', Ref, _, _, Reason} ->
                    {error, Reason}
            after Timeout ->
                erlang:demonitor(Ref, [flush]),
                {error, Timeout}
            end;
        _ ->
            ok
    end.

is_compacting(DbName) ->
    couch_server:is_compacting(DbName).

delete_doc(Db, Id, Revisions) ->
    DeletedDocs = [#doc{id = Id, revs = [Rev], deleted = true} || Rev <- Revisions],
    {ok, [Result]} = update_docs(Db, DeletedDocs, []),
    {ok, Result}.

open_doc(Db, IdOrDocInfo) ->
    open_doc(Db, IdOrDocInfo, []).

open_doc(Db, Id, Options) ->
    increment_stat(Db, [couchdb, database_reads]),
    case open_doc_int(Db, Id, Options) of
        {ok, #doc{deleted = true} = Doc} ->
            case lists:member(deleted, Options) of
                true ->
                    apply_open_options({ok, Doc}, Options);
                false ->
                    {not_found, deleted}
            end;
        Else ->
            apply_open_options(Else, Options)
    end.

apply_open_options({ok, Doc}, Options) ->
    apply_open_options2(Doc, Options);
apply_open_options(Else, _Options) ->
    Else.

apply_open_options2(Doc, []) ->
    {ok, Doc};
apply_open_options2(
    #doc{atts = Atts0, revs = Revs} = Doc,
    [{atts_since, PossibleAncestors} | Rest]
) ->
    RevPos = find_ancestor_rev_pos(Revs, PossibleAncestors),
    Atts = lists:map(
        fun(Att) ->
            [AttPos, Data] = couch_att:fetch([revpos, data], Att),
            if
                AttPos > RevPos -> couch_att:store(data, Data, Att);
                true -> couch_att:store(data, stub, Att)
            end
        end,
        Atts0
    ),
    apply_open_options2(Doc#doc{atts = Atts}, Rest);
apply_open_options2(Doc, [ejson_body | Rest]) ->
    apply_open_options2(couch_doc:with_ejson_body(Doc), Rest);
apply_open_options2(Doc, [_ | Rest]) ->
    apply_open_options2(Doc, Rest).

find_ancestor_rev_pos({_, []}, _AttsSinceRevs) ->
    0;
find_ancestor_rev_pos(_DocRevs, []) ->
    0;
find_ancestor_rev_pos({RevPos, [RevId | Rest]}, AttsSinceRevs) ->
    case lists:member({RevPos, RevId}, AttsSinceRevs) of
        true ->
            RevPos;
        false ->
            find_ancestor_rev_pos({RevPos - 1, Rest}, AttsSinceRevs)
    end.

open_doc_revs(Db, IdRevsOpts, Options) when is_list(IdRevsOpts) ->
    increment_stat(Db, [couchdb, database_reads], length(IdRevsOpts)),
    % IdRevsOpts looks like [{{Id, Revs}, DocOpts}, ...]
    {IdRevs, DocOptsOnly} = lists:unzip(IdRevsOpts),
    % Function open_doc_revs_int takes [{Id, Revs},...] as its argument
    AllResults = open_doc_revs_int(Db, IdRevs, Options),
    % Apply document open options like {atts_since, ...} etc
    ResultsZipFun = fun(DocOpts, {ok, Results}) ->
        [apply_open_options(R, DocOpts) || R <- Results]
    end,
    lists:zipwith(ResultsZipFun, DocOptsOnly, AllResults).

open_doc_revs(Db, Id, Revs, Options) ->
    increment_stat(Db, [couchdb, database_reads]),
    [{ok, Results}] = open_doc_revs_int(Db, [{Id, Revs}], Options),
    {ok, [apply_open_options(Result, Options) || Result <- Results]}.

% Each returned result is a list of tuples:
% {Id, MissingRevs, PossibleAncestors}
% if no revs are missing, MissingRevs is []
get_missing_revs(Db, IdRevsList) ->
    FDIs = get_full_doc_infos(Db, [Id || {Id, _Revs} <- IdRevsList]),
    Results = lists:zipwith(
        fun
            ({Id, Revs}, #full_doc_info{rev_tree = RevTree} = FDI) ->
                MissingRevs = couch_key_tree:find_missing(RevTree, Revs),
                {Id, MissingRevs, possible_ancestors(FDI, MissingRevs)};
            ({Id, Revs}, not_found) ->
                {Id, Revs, []}
        end,
        IdRevsList,
        FDIs
    ),
    {ok, Results}.

get_doc_info(Db, Id) ->
    case get_full_doc_info(Db, Id) of
        #full_doc_info{} = FDI ->
            {ok, couch_doc:to_doc_info(FDI)};
        Else ->
            Else
    end.

get_full_doc_info(Db, Id) ->
    [Result] = get_full_doc_infos(Db, [Id]),
    Result.

get_full_doc_infos(Db, Ids) ->
    couch_db_engine:open_docs(Db, Ids).

purge_docs(Db, IdRevs) ->
    purge_docs(Db, IdRevs, []).

-spec purge_docs(#db{}, [{UUId, Id, [Rev]}], [PurgeOption]) ->
    {ok, [Reply]}
when
    UUId :: binary(),
    Id :: binary() | list(),
    Rev :: {non_neg_integer(), binary()},
    PurgeOption :: interactive_edit | replicated_changes,
    Reply :: {ok, []} | {ok, [Rev]}.
purge_docs(#db{main_pid = Pid} = Db, UUIDsIdsRevs, Options) ->
    UUIDsIdsRevs1 = [
        {UUID, couch_util:to_binary(Id), lists:usort(Revs)}
     || {UUID, Id, Revs} <- UUIDsIdsRevs
    ],
    % Gather any existing purges with the same UUIDs
    UUIDs = element(1, lists:unzip3(UUIDsIdsRevs1)),
    Old1 = get_purge_infos(Db, UUIDs),
    Old2 = maps:from_list([{UUID, {Id, Revs}} || {_, UUID, Id, Revs} <- Old1]),
    % Filter out all the purges which have already been processed
    FilterCheckFun = fun({UUID, Id, Revs}) ->
        case maps:is_key(UUID, Old2) of
            true ->
                #{UUID := {OldId, OldRevs}} = Old2,
                case Id =:= OldId andalso lists:usort(Revs) =:= lists:usort(OldRevs) of
                    true ->
                        % Processed this request already, so filter it out
                        false;
                    false ->
                        % Oops! Same UUID but somehow different Id or Revs
                        Fmt = "Duplicate purge info UUID: ~s DocId: ~s",
                        Reason = lists:flatten(io_lib:format(Fmt, [UUID, Id])),
                        throw({badreq, Reason})
                end;
            false ->
                % Have not seen this request yet, so we keep it
                true
        end
    end,
    UUIDsIdsRevs2 = lists:filter(FilterCheckFun, UUIDsIdsRevs1),
    % If all the purges are already applied, skip calling the updater
    case UUIDsIdsRevs2 of
        [] ->
            {ok, []};
        [_ | _] ->
            increment_stat(Db, [couchdb, database_purges]),
            gen_server:call(Pid, {purge_docs, UUIDsIdsRevs2, Options}, infinity)
    end.

-spec get_purge_infos(#db{}, [UUId]) -> [PurgeInfo] when
    UUId :: binary(),
    PurgeInfo :: {PurgeSeq, UUId, Id, [Rev]} | not_found,
    PurgeSeq :: non_neg_integer(),
    Id :: binary(),
    Rev :: {non_neg_integer(), binary()}.
get_purge_infos(Db, UUIDs) ->
    couch_db_engine:load_purge_infos(Db, UUIDs).

get_minimum_purge_seq(#db{} = Db) ->
    PurgeSeq = couch_db_engine:get_purge_seq(Db),
    OldestPurgeSeq = couch_db_engine:get_oldest_purge_seq(Db),
    PurgeInfosLimit = couch_db_engine:get_purge_infos_limit(Db),

    FoldFun = fun(#doc{id = DocId, body = {Props}}, SeqAcc) ->
        case DocId of
            <<?LOCAL_DOC_PREFIX, "purge-", _/binary>> ->
                ClientSeq = couch_util:get_value(<<"purge_seq">>, Props),
                DbName = couch_db:name(Db),
                % If there's a broken doc we have to keep every
                % purge info until the doc is fixed or removed.
                case ClientSeq of
                    CS when is_integer(CS), CS >= PurgeSeq - PurgeInfosLimit ->
                        {ok, SeqAcc};
                    CS when is_integer(CS) ->
                        case purge_client_exists(DbName, DocId, Props) of
                            true ->
                                {ok, erlang:min(CS, SeqAcc)};
                            false ->
                                Fmt1 =
                                    "Missing or stale purge doc '~s' on ~p "
                                    "with purge_seq '~w'",
                                couch_log:error(Fmt1, [DocId, DbName, ClientSeq]),
                                {ok, SeqAcc}
                        end;
                    _ ->
                        Fmt2 = "Invalid purge doc '~s' on ~p with purge_seq '~w'",
                        couch_log:error(Fmt2, [DocId, DbName, ClientSeq]),
                        {ok, erlang:min(OldestPurgeSeq, SeqAcc)}
                end;
            _ ->
                {stop, SeqAcc}
        end
    end,
    InitMinSeq = PurgeSeq - PurgeInfosLimit,
    Opts = [
        {start_key, list_to_binary(?LOCAL_DOC_PREFIX ++ "purge-")}
    ],
    {ok, MinIdxSeq} = couch_db:fold_local_docs(Db, FoldFun, InitMinSeq, Opts),
    FinalSeq =
        case MinIdxSeq < PurgeSeq - PurgeInfosLimit of
            true -> MinIdxSeq;
            false -> erlang:max(0, PurgeSeq - PurgeInfosLimit)
        end,
    % Log a warning if we've got a purge sequence exceeding the
    % configured threshold.
    if
        FinalSeq >= (PurgeSeq - PurgeInfosLimit) ->
            ok;
        true ->
            Fmt = "The purge sequence for '~s' exceeds configured threshold",
            couch_log:warning(Fmt, [couch_db:name(Db)])
    end,
    FinalSeq.

purge_client_exists(DbName, DocId, Props) ->
    try
        Exists = couch_db_plugin:is_valid_purge_client(DbName, Props),
        case Exists of
            true ->
                Updated = couch_util:get_value(<<"updated_on">>, Props),
                purge_client_warn_lag(Updated, DbName, DocId);
            _ ->
                ok
        end,
        Exists
    catch
        Tag:Error ->
            % If we fail to check for a client we have to assume that
            % it exists.
            Log = "~p : Failed to check purge checkpoint ~p : ~s. Error ~p:~p",
            couch_log:error(Log, [?MODULE, DbName, DocId, Tag, Error]),
            true
    end.

set_purge_infos_limit(#db{main_pid = Pid} = Db, Limit) when Limit > 0 ->
    check_is_admin(Db),
    gen_server:call(Pid, {set_purge_infos_limit, Limit}, infinity);
set_purge_infos_limit(_Db, _Limit) ->
    throw(invalid_purge_infos_limit).

get_after_doc_read_fun(#db{after_doc_read = Fun}) ->
    Fun.

get_before_doc_update_fun(#db{before_doc_update = Fun}) ->
    Fun.

get_committed_update_seq(#db{committed_update_seq = Seq}) ->
    Seq.

get_update_seq(#db{} = Db) ->
    couch_db_engine:get_update_seq(Db).

get_user_ctx(#db{user_ctx = UserCtx}) ->
    UserCtx;
get_user_ctx(?OLD_DB_REC = Db) ->
    ?OLD_DB_USER_CTX(Db).

get_purge_seq(#db{} = Db) ->
    couch_db_engine:get_purge_seq(Db).

get_oldest_purge_seq(#db{} = Db) ->
    couch_db_engine:get_oldest_purge_seq(Db).

get_purge_infos_limit(#db{} = Db) ->
    couch_db_engine:get_purge_infos_limit(Db).

get_pid(#db{main_pid = Pid}) ->
    Pid.

get_del_doc_count(Db) ->
    {ok, couch_db_engine:get_del_doc_count(Db)}.

get_doc_count(Db) ->
    {ok, couch_db_engine:get_doc_count(Db)}.

get_uuid(#db{} = Db) ->
    couch_db_engine:get_uuid(Db).

get_epochs(#db{} = Db) ->
    Epochs = couch_db_engine:get_epochs(Db),
    validate_epochs(Epochs),
    Epochs.

get_filepath(#db{filepath = FilePath}) ->
    FilePath.

get_instance_start_time(#db{instance_start_time = IST}) ->
    IST.

get_compacted_seq(#db{} = Db) ->
    couch_db_engine:get_compacted_seq(Db).

get_compactor_pid(#db{compactor_pid = Pid}) ->
    Pid.

get_compactor_pid_sync(#db{main_pid = Pid}) ->
    case gen_server:call(Pid, compactor_pid, infinity) of
        CPid when is_pid(CPid) ->
            CPid;
        _ ->
            nil
    end.

get_db_info(Db) ->
    #db{
        name = Name,
        compactor_pid = Compactor,
        instance_start_time = StartTime,
        committed_update_seq = CommittedUpdateSeq
    } = Db,
    {ok, DocCount} = get_doc_count(Db),
    {ok, DelDocCount} = get_del_doc_count(Db),
    SizeInfo = couch_db_engine:get_size_info(Db),
    DiskVersion = couch_db_engine:get_disk_version(Db),
    Uuid =
        case get_uuid(Db) of
            undefined -> null;
            Uuid0 -> Uuid0
        end,
    CompactedSeq =
        case get_compacted_seq(Db) of
            undefined -> null;
            Else1 -> Else1
        end,
    Props =
        case couch_db_engine:get_props(Db) of
            undefined -> null;
            Else2 -> {Else2}
        end,
    InfoList = [
        {db_name, Name},
        {engine, couch_db_engine:get_engine(Db)},
        {doc_count, DocCount},
        {doc_del_count, DelDocCount},
        {update_seq, get_update_seq(Db)},
        {purge_seq, couch_db_engine:get_purge_seq(Db)},
        {compact_running, Compactor /= nil},
        {sizes, {SizeInfo}},
        {instance_start_time, StartTime},
        {disk_format_version, DiskVersion},
        {committed_update_seq, CommittedUpdateSeq},
        {compacted_seq, CompactedSeq},
        {props, Props},
        {uuid, Uuid}
    ],
    {ok, InfoList}.

get_partition_info(#db{} = Db, Partition) when is_binary(Partition) ->
    Info = couch_db_engine:get_partition_info(Db, Partition),
    {ok, Info};
get_partition_info(_Db, _Partition) ->
    throw({bad_request, <<"`partition` is not valid">>}).

get_design_doc(#db{name = <<"shards/", _/binary>> = ShardDbName}, DDocId0) ->
    DDocId = couch_util:normalize_ddoc_id(DDocId0),
    DbName = mem3:dbname(ShardDbName),
    {_, Ref} = spawn_monitor(fun() ->
        exit(fabric:open_doc(DbName, DDocId, []))
    end),
    receive
        {'DOWN', Ref, _, _, Response} ->
            Response
    end;
get_design_doc(#db{} = Db, DDocId0) ->
    DDocId = couch_util:normalize_ddoc_id(DDocId0),
    couch_db:open_doc_int(Db, DDocId, [ejson_body]).

% Note: get_design_docs/1 for clustered db shards returns docs as ejson {[_ |
% _]} and for simple node-local dbs it returns #full_docs_info{} records. To
% obtain ejson docs in that case would need to call couch_db:open_doc_int(Db,
% FDI, [ejson_body]) for each one of them.
%
get_design_docs(#db{name = <<"shards/", _/binary>> = ShardDbName}) ->
    DbName = mem3:dbname(ShardDbName),
    {_, Ref} = spawn_monitor(fun() -> exit(fabric:design_docs(DbName)) end),
    receive
        {'DOWN', Ref, _, _, Response} ->
            Response
    end;
get_design_docs(#db{} = Db) ->
    FoldFun = fun(FDI, Acc) -> {ok, [FDI | Acc]} end,
    {ok, Docs} = fold_design_docs(Db, FoldFun, [], []),
    {ok, lists:reverse(Docs)}.

get_design_doc_count(#db{} = Db) ->
    FoldFun = fun(_, Acc) -> {ok, Acc + 1} end,
    fold_design_docs(Db, FoldFun, 0, []).

check_is_admin(#db{user_ctx = UserCtx} = Db) ->
    case is_admin(Db) of
        true ->
            ok;
        false ->
            Reason = <<"You are not a db or server admin.">>,
            throw_security_error(UserCtx, Reason)
    end.

check_is_member(#db{user_ctx = UserCtx} = Db) ->
    case is_member(Db) of
        true -> ok;
        false -> throw_security_error(UserCtx)
    end.

is_admin(#db{user_ctx = UserCtx} = Db) ->
    case couch_db_plugin:check_is_admin(Db) of
        true ->
            true;
        false ->
            {Admins} = get_admins(Db),
            is_authorized(UserCtx, Admins)
    end.

is_member(#db{user_ctx = UserCtx} = Db) ->
    case is_admin(Db) of
        true ->
            true;
        false ->
            case is_public_db(Db) of
                true ->
                    true;
                false ->
                    {Members} = get_members(Db),
                    is_authorized(UserCtx, Members)
            end
    end.

is_public_db(#db{} = Db) ->
    {Members} = get_members(Db),
    Names = couch_util:get_value(<<"names">>, Members, []),
    Roles = couch_util:get_value(<<"roles">>, Members, []),
    Names =:= [] andalso Roles =:= [].

is_authorized(#user_ctx{name = UserName, roles = UserRoles}, Security) ->
    Names = couch_util:get_value(<<"names">>, Security, []),
    Roles = couch_util:get_value(<<"roles">>, Security, []),
    case check_security(roles, UserRoles, [<<"_admin">> | Roles]) of
        true -> true;
        false -> check_security(names, UserName, Names)
    end.

check_security(roles, [_ | _] = UserRoles, [_ | _] = Roles) ->
    UserRolesSet = ordsets:from_list(UserRoles),
    RolesSet = ordsets:from_list(Roles),
    not ordsets:is_disjoint(UserRolesSet, RolesSet);
check_security(roles, _, _) ->
    false;
check_security(names, null, _) ->
    false;
check_security(names, UserName, [_ | _] = Names) ->
    lists:member(UserName, Names);
check_security(names, _, _) ->
    false.

throw_security_error(#user_ctx{name = null} = UserCtx) ->
    Reason = <<"You are not authorized to access this db.">>,
    throw_security_error(UserCtx, Reason);
throw_security_error(#user_ctx{name = _} = UserCtx) ->
    Reason = <<"You are not allowed to access this db.">>,
    throw_security_error(UserCtx, Reason).
throw_security_error(#user_ctx{} = UserCtx, Reason) ->
    Error = security_error_type(UserCtx),
    throw({Error, Reason}).

security_error_type(#user_ctx{name = null}) ->
    unauthorized;
security_error_type(#user_ctx{name = _}) ->
    forbidden.

get_admins(#db{security = SecProps}) ->
    couch_util:get_value(<<"admins">>, SecProps, {[]}).

get_members(#db{security = SecProps}) ->
    % we fallback to readers here for backwards compatibility
    couch_util:get_value(
        <<"members">>,
        SecProps,
        couch_util:get_value(<<"readers">>, SecProps, {[]})
    ).

get_security(#db{security = SecProps}) ->
    {SecProps};
get_security(?OLD_DB_REC = Db) ->
    {?OLD_DB_SECURITY(Db)}.

set_security(#db{main_pid = Pid} = Db, {NewSecProps}) when is_list(NewSecProps) ->
    check_is_admin(Db),
    ok = validate_security_object(NewSecProps),
    gen_server:call(Pid, {set_security, NewSecProps}, infinity);
set_security(_, _) ->
    throw(bad_request).

set_user_ctx(#db{} = Db, UserCtx) ->
    {ok, Db#db{user_ctx = UserCtx}}.

validate_security_object(SecProps) ->
    Admins = couch_util:get_value(<<"admins">>, SecProps, {[]}),
    % we fallback to readers here for backwards compatibility
    Members = couch_util:get_value(
        <<"members">>,
        SecProps,
        couch_util:get_value(<<"readers">>, SecProps, {[]})
    ),
    ok = validate_names_and_roles(Admins),
    ok = validate_names_and_roles(Members),
    ok.

% validate user input
validate_names_and_roles({Props}) when is_list(Props) ->
    case couch_util:get_value(<<"names">>, Props, []) of
        Ns when is_list(Ns) ->
            [throw("names must be a JSON list of strings") || N <- Ns, not is_binary(N)],
            Ns;
        _ ->
            throw("names must be a JSON list of strings")
    end,
    case couch_util:get_value(<<"roles">>, Props, []) of
        Rs when is_list(Rs) ->
            [throw("roles must be a JSON list of strings") || R <- Rs, not is_binary(R)],
            Rs;
        _ ->
            throw("roles must be a JSON list of strings")
    end,
    ok;
validate_names_and_roles(_) ->
    throw("admins or members must be a JSON list of strings").

get_revs_limit(#db{} = Db) ->
    couch_db_engine:get_revs_limit(Db).

set_revs_limit(#db{main_pid = Pid} = Db, Limit) when Limit > 0 ->
    check_is_admin(Db),
    gen_server:call(Pid, {set_revs_limit, Limit}, infinity);
set_revs_limit(_Db, _Limit) ->
    throw(invalid_revs_limit).

name(#db{name = Name}) ->
    Name;
name(?OLD_DB_REC = Db) ->
    ?OLD_DB_NAME(Db).

validate_docid(#db{} = Db, DocId) when is_binary(DocId) ->
    couch_doc:validate_docid(DocId, name(Db)),
    case is_partitioned(Db) of
        true ->
            couch_partition:validate_docid(DocId);
        false ->
            ok
    end.

doc_from_json_obj_validate(#db{} = Db, DocJson) ->
    Doc = couch_doc:from_json_obj_validate(DocJson, name(Db)),
    {Props} = DocJson,
    case couch_util:get_value(<<"_id">>, Props) of
        DocId when is_binary(DocId) ->
            % Only validate the docid if it was provided
            validate_docid(Db, DocId);
        _ ->
            ok
    end,
    Doc.

update_doc(Db, Doc, Options) ->
    update_doc(Db, Doc, Options, ?INTERACTIVE_EDIT).

update_doc(Db, Doc, Options, UpdateType) ->
    case update_docs(Db, [Doc], Options, UpdateType) of
        {ok, [{ok, NewRev}]} ->
            {ok, NewRev};
        {ok, [{{_Id, _Rev}, Error}]} ->
            throw(Error);
        {ok, [Error]} ->
            throw(Error);
        {ok, []} ->
            % replication success
            {Pos, [RevId | _]} = Doc#doc.revs,
            {ok, {Pos, RevId}}
    end.

update_docs(Db, Docs) ->
    update_docs(Db, Docs, []).

% group_alike_docs groups the sorted documents into sublist buckets, by id.
% ([DocA, DocA, DocB, DocC], []) -> [[DocA, DocA], [DocB], [DocC]]
group_alike_docs(Docs) ->
    % Here we're just asserting that our doc sort is stable so that
    % if we have duplicate docids we don't have to worry about the
    % behavior of lists:sort/2 which isn't documented anyhwere as
    % being stable.
    WithPos = lists:zip(Docs, lists:seq(1, length(Docs))),
    SortFun = fun({D1, P1}, {D2, P2}) -> {D1#doc.id, P1} =< {D2#doc.id, P2} end,
    SortedDocs = [D || {D, _} <- lists:sort(SortFun, WithPos)],
    group_alike_docs(SortedDocs, []).

group_alike_docs([], Buckets) ->
    lists:reverse(lists:map(fun lists:reverse/1, Buckets));
group_alike_docs([Doc | Rest], []) ->
    group_alike_docs(Rest, [[Doc]]);
group_alike_docs([Doc | Rest], [Bucket | RestBuckets]) ->
    [#doc{id = BucketId} | _] = Bucket,
    case Doc#doc.id == BucketId of
        true ->
            % add to existing bucket
            group_alike_docs(Rest, [[Doc | Bucket] | RestBuckets]);
        false ->
            % add to new bucket
            group_alike_docs(Rest, [[Doc] | [Bucket | RestBuckets]])
    end.

validate_doc_update(#db{} = Db, #doc{id = <<"_design/", _/binary>>} = Doc, _GetDiskDocFun) ->
    case catch check_is_admin(Db) of
        ok -> validate_ddoc(Db, Doc);
        Error -> Error
    end;
validate_doc_update(#db{validate_doc_funs = undefined} = Db, Doc, Fun) ->
    ValidationFuns = load_validation_funs(Db),
    validate_doc_update(Db#db{validate_doc_funs = ValidationFuns}, Doc, Fun);
validate_doc_update(#db{validate_doc_funs = []}, _Doc, _GetDiskDocFun) ->
    ok;
validate_doc_update(_Db, #doc{id = <<"_local/", _/binary>>}, _GetDiskDocFun) ->
    ok;
validate_doc_update(Db, Doc, GetDiskDocFun) ->
    case get(io_priority) of
        {internal_repl, _} ->
            ok;
        _ ->
            validate_doc_update_int(Db, Doc, GetDiskDocFun)
    end.

validate_ddoc(Db, DDoc) ->
    try
        ok = couch_index_server:validate(Db, couch_doc:with_ejson_body(DDoc))
    catch
        throw:{invalid_design_doc, Reason} ->
            {bad_request, invalid_design_doc, Reason};
        throw:{compilation_error, Reason} ->
            {bad_request, compilation_error, Reason};
        throw:Error ->
            Error
    end.

validate_doc_update_int(Db, Doc, GetDiskDocFun) ->
    Fun = fun() ->
        DiskDoc = GetDiskDocFun(),
        JsonCtx = couch_util:json_user_ctx(Db),
        SecObj = get_security(Db),
        try
            [
                case Fun(Doc, DiskDoc, JsonCtx, SecObj) of
                    ok -> ok;
                    Error -> throw(Error)
                end
             || Fun <- Db#db.validate_doc_funs
            ],
            ok
        catch
            throw:Error ->
                Error
        end
    end,
    couch_stats:update_histogram(
        [couchdb, query_server, vdu_process_time],
        Fun
    ).

% to be safe, spawn a middleman here
load_validation_funs(#db{main_pid = Pid, name = <<"shards/", _/binary>>} = Db) ->
    {_, Ref} = spawn_monitor(fun() ->
        exit(ddoc_cache:open(mem3:dbname(Db#db.name), validation_funs))
    end),
    receive
        {'DOWN', Ref, _, _, {ok, Funs}} ->
            gen_server:cast(Pid, {load_validation_funs, Funs}),
            Funs;
        {'DOWN', Ref, _, _, {database_does_not_exist, _StackTrace}} ->
            ok = couch_server:close_db_if_idle(Db#db.name),
            erlang:error(database_does_not_exist);
        {'DOWN', Ref, _, _, Reason} ->
            couch_log:error("could not load validation funs ~p", [Reason]),
            throw(internal_server_error)
    end;
load_validation_funs(#db{main_pid = Pid} = Db) ->
    {ok, DDocInfos} = get_design_docs(Db),
    OpenDocs = fun(#full_doc_info{} = D) ->
        {ok, Doc} = open_doc_int(Db, D, [ejson_body]),
        Doc
    end,
    DDocs = lists:map(OpenDocs, DDocInfos),
    Funs = lists:flatmap(
        fun(DDoc) ->
            case couch_doc:get_validate_doc_fun(Db, DDoc) of
                nil -> [];
                Fun -> [Fun]
            end
        end,
        DDocs
    ),
    gen_server:cast(Pid, {load_validation_funs, Funs}),
    Funs.

reload_validation_funs(#db{} = Db) ->
    gen_server:cast(Db#db.main_pid, {load_validation_funs, undefined}).

prep_and_validate_update(
    Db,
    #doc{id = Id, revs = {RevStart, Revs}} = Doc,
    OldFullDocInfo,
    LeafRevsDict,
    AllowConflict
) ->
    case Revs of
        [PrevRev | _] ->
            case dict:find({RevStart, PrevRev}, LeafRevsDict) of
                {ok, {#leaf{deleted = Deleted, ptr = DiskSp}, DiskRevs}} ->
                    case couch_doc:has_stubs(Doc) of
                        true ->
                            DiskDoc = make_doc(Db, Id, Deleted, DiskSp, DiskRevs),
                            Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
                            {validate_doc_update(Db, Doc2, fun() -> DiskDoc end), Doc2};
                        false ->
                            LoadDiskDoc = fun() -> make_doc(Db, Id, Deleted, DiskSp, DiskRevs) end,
                            {validate_doc_update(Db, Doc, LoadDiskDoc), Doc}
                    end;
                error when AllowConflict ->
                    % will generate error if
                    couch_doc:merge_stubs(Doc, #doc{}),
                    % there are stubs
                    {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
                error ->
                    {conflict, Doc}
            end;
        [] ->
            % new doc, and we have existing revs.
            % reuse existing deleted doc
            if
                OldFullDocInfo#full_doc_info.deleted orelse AllowConflict ->
                    {validate_doc_update(Db, Doc, fun() -> nil end), Doc};
                true ->
                    {conflict, Doc}
            end
    end.

prep_and_validate_updates(
    _Db,
    [],
    [],
    _AllowConflict,
    AccPrepped,
    AccFatalErrors
) ->
    AccPrepped2 = lists:reverse(lists:map(fun lists:reverse/1, AccPrepped)),
    {AccPrepped2, AccFatalErrors};
prep_and_validate_updates(
    Db,
    [DocBucket | RestBuckets],
    [not_found | RestLookups],
    AllowConflict,
    AccPrepped,
    AccErrors
) ->
    % no existing revs are known,
    {PreppedBucket, AccErrors3} = lists:foldl(
        fun(#doc{revs = Revs} = Doc, {AccBucket, AccErrors2}) ->
            case couch_doc:has_stubs(Doc) of
                true ->
                    % will throw exception
                    couch_doc:merge_stubs(Doc, #doc{});
                false ->
                    ok
            end,
            case Revs of
                {0, []} ->
                    case validate_doc_update(Db, Doc, fun() -> nil end) of
                        ok ->
                            {[Doc | AccBucket], AccErrors2};
                        Error ->
                            {AccBucket, [{doc_tag(Doc), Error} | AccErrors2]}
                    end;
                _ ->
                    % old revs specified but none exist, a conflict
                    {AccBucket, [{doc_tag(Doc), conflict} | AccErrors2]}
            end
        end,
        {[], AccErrors},
        DocBucket
    ),

    prep_and_validate_updates(
        Db,
        RestBuckets,
        RestLookups,
        AllowConflict,
        [PreppedBucket | AccPrepped],
        AccErrors3
    );
prep_and_validate_updates(
    Db,
    [DocBucket | RestBuckets],
    [#full_doc_info{rev_tree = OldRevTree} = OldFullDocInfo | RestLookups],
    AllowConflict,
    AccPrepped,
    AccErrors
) ->
    Leafs = couch_key_tree:get_all_leafs(OldRevTree),
    LeafRevsDict = dict:from_list([
        {{Start, RevId}, {Leaf, Revs}}
     || {Leaf, {Start, [RevId | _]} = Revs} <- Leafs
    ]),
    {PreppedBucket, AccErrors3} = lists:foldl(
        fun(Doc, {Docs2Acc, AccErrors2}) ->
            case
                prep_and_validate_update(
                    Db,
                    Doc,
                    OldFullDocInfo,
                    LeafRevsDict,
                    AllowConflict
                )
            of
                {ok, Doc2} ->
                    {[Doc2 | Docs2Acc], AccErrors2};
                {Error, _} ->
                    % Record the error
                    {Docs2Acc, [{doc_tag(Doc), Error} | AccErrors2]}
            end
        end,
        {[], AccErrors},
        DocBucket
    ),
    prep_and_validate_updates(
        Db,
        RestBuckets,
        RestLookups,
        AllowConflict,
        [PreppedBucket | AccPrepped],
        AccErrors3
    ).

update_docs(Db, Docs, Options) ->
    update_docs(Db, Docs, Options, ?INTERACTIVE_EDIT).

prep_and_validate_replicated_updates(_Db, [], [], AccPrepped, AccErrors) ->
    Errors2 = [
        {{Id, {Pos, Rev}}, Error}
     || {#doc{id = Id, revs = {Pos, [Rev | _]}}, Error} <- AccErrors
    ],
    AccPrepped2 = lists:reverse(lists:map(fun lists:reverse/1, AccPrepped)),
    {AccPrepped2, lists:reverse(Errors2)};
prep_and_validate_replicated_updates(
    Db, [Bucket | RestBuckets], [OldInfo | RestOldInfo], AccPrepped, AccErrors
) ->
    case OldInfo of
        not_found ->
            {ValidatedBucket, AccErrors3} = lists:foldl(
                fun(Doc, {AccPrepped2, AccErrors2}) ->
                    case couch_doc:has_stubs(Doc) of
                        true ->
                            % will throw exception
                            couch_doc:merge_stubs(Doc, #doc{});
                        false ->
                            ok
                    end,
                    case validate_doc_update(Db, Doc, fun() -> nil end) of
                        ok ->
                            {[Doc | AccPrepped2], AccErrors2};
                        Error ->
                            {AccPrepped2, [{Doc, Error} | AccErrors2]}
                    end
                end,
                {[], AccErrors},
                Bucket
            ),
            prep_and_validate_replicated_updates(
                Db, RestBuckets, RestOldInfo, [ValidatedBucket | AccPrepped], AccErrors3
            );
        #full_doc_info{rev_tree = OldTree} ->
            OldLeafs = couch_key_tree:get_all_leafs_full(OldTree),
            OldLeafsLU = [{Start, RevId} || {Start, [{RevId, _} | _]} <- OldLeafs],
            NewPaths = lists:map(fun couch_doc:to_path/1, Bucket),
            NewRevTree = couch_key_tree:multi_merge(OldTree, NewPaths),
            Leafs = couch_key_tree:get_all_leafs_full(NewRevTree),
            LeafRevsFullDict = dict:from_list([
                {{Start, RevId}, FullPath}
             || {Start, [{RevId, _} | _]} = FullPath <- Leafs
            ]),
            {ValidatedBucket, AccErrors3} =
                lists:foldl(
                    fun(#doc{id = Id, revs = {Pos, [RevId | _]}} = Doc, {AccValidated, AccErrors2}) ->
                        IsOldLeaf = lists:member({Pos, RevId}, OldLeafsLU),
                        case dict:find({Pos, RevId}, LeafRevsFullDict) of
                            {ok, {Start, Path}} when not IsOldLeaf ->
                                % our unflushed doc is a leaf node. Go back on the path
                                % to find the previous rev that's on disk.

                                LoadPrevRevFun = fun() ->
                                    make_first_doc_on_disk(Db, Id, Start - 1, tl(Path))
                                end,

                                case couch_doc:has_stubs(Doc) of
                                    true ->
                                        DiskDoc =
                                            case LoadPrevRevFun() of
                                                #doc{} = DiskDoc0 ->
                                                    DiskDoc0;
                                                _ ->
                                                    % Force a missing_stub exception
                                                    couch_doc:merge_stubs(Doc, #doc{})
                                            end,
                                        Doc2 = couch_doc:merge_stubs(Doc, DiskDoc),
                                        GetDiskDocFun = fun() -> DiskDoc end;
                                    false ->
                                        Doc2 = Doc,
                                        GetDiskDocFun = LoadPrevRevFun
                                end,

                                case validate_doc_update(Db, Doc2, GetDiskDocFun) of
                                    ok ->
                                        {[Doc2 | AccValidated], AccErrors2};
                                    Error ->
                                        {AccValidated, [{Doc, Error} | AccErrors2]}
                                end;
                            _ ->
                                % this doc isn't a leaf or already exists in the tree.
                                % ignore but consider it a success.
                                {AccValidated, AccErrors2}
                        end
                    end,
                    {[], AccErrors},
                    Bucket
                ),
            prep_and_validate_replicated_updates(
                Db,
                RestBuckets,
                RestOldInfo,
                [ValidatedBucket | AccPrepped],
                AccErrors3
            )
    end.

new_revid(#doc{body = Body, revs = {OldStart, OldRevs}, atts = Atts, deleted = Deleted}) ->
    DigestedAtts = lists:foldl(
        fun(Att, Acc) ->
            [N, T, M] = couch_att:fetch([name, type, md5], Att),
            case M == <<>> of
                true -> Acc;
                false -> [{N, T, M} | Acc]
            end
        end,
        [],
        Atts
    ),
    case DigestedAtts of
        Atts2 when length(Atts) =/= length(Atts2) ->
            % We must have old style non-md5 attachments
            ?l2b(integer_to_list(couch_util:rand32()));
        Atts2 ->
            OldRev =
                case OldRevs of
                    [] -> 0;
                    [OldRev0 | _] -> OldRev0
                end,
            couch_hash:md5_hash(
                term_to_binary([Deleted, OldStart, OldRev, Body, Atts2], [{minor_version, 1}])
            )
    end.

new_revs([], OutBuckets, IdRevsAcc) ->
    {lists:reverse(OutBuckets), IdRevsAcc};
new_revs([Bucket | RestBuckets], OutBuckets, IdRevsAcc) ->
    {NewBucket, IdRevsAcc3} = lists:mapfoldl(
        fun(#doc{revs = {Start, RevIds}} = Doc, IdRevsAcc2) ->
            NewRevId = new_revid(Doc),
            {Doc#doc{revs = {Start + 1, [NewRevId | RevIds]}}, [
                {doc_tag(Doc), {ok, {Start + 1, NewRevId}}} | IdRevsAcc2
            ]}
        end,
        IdRevsAcc,
        Bucket
    ),
    new_revs(RestBuckets, [NewBucket | OutBuckets], IdRevsAcc3).

check_dup_atts(#doc{atts = Atts} = Doc) ->
    lists:foldl(
        fun(Att, Names) ->
            Name = couch_att:fetch(name, Att),
            case ordsets:is_element(Name, Names) of
                true -> throw({bad_request, <<"Duplicate attachments">>});
                false -> ordsets:add_element(Name, Names)
            end
        end,
        ordsets:new(),
        Atts
    ),
    Doc.

tag_docs([]) ->
    [];
tag_docs([#doc{meta = Meta} = Doc | Rest]) ->
    [Doc#doc{meta = [{ref, make_ref()} | Meta]} | tag_docs(Rest)].

doc_tag(#doc{meta = Meta}) ->
    case lists:keyfind(ref, 1, Meta) of
        {ref, Ref} when is_reference(Ref) -> Ref;
        false -> throw(doc_not_tagged);
        Else -> throw({invalid_doc_tag, Else})
    end.

update_docs(Db, Docs0, Options, ?REPLICATED_CHANGES) ->
    Docs = tag_docs(Docs0),

    PrepValidateFun = fun(Db0, DocBuckets0, ExistingDocInfos) ->
        prep_and_validate_replicated_updates(
            Db0,
            DocBuckets0,
            ExistingDocInfos,
            [],
            []
        )
    end,

    {ok, DocBuckets, LocalDocs, DocErrors} =
        before_docs_update(Db, Docs, PrepValidateFun, ?REPLICATED_CHANGES),

    DocBuckets2 = [
        [
            doc_flush_atts(Db, check_dup_atts(Doc))
         || Doc <- Bucket
        ]
     || Bucket <- DocBuckets
    ],
    {ok, _} = write_and_commit(
        Db,
        DocBuckets2,
        LocalDocs,
        [?REPLICATED_CHANGES | Options]
    ),
    {ok, DocErrors};
update_docs(Db, Docs0, Options, ?INTERACTIVE_EDIT) ->
    BlockInteractiveDatabaseWrites = couch_disk_monitor:block_interactive_database_writes(),
    if
        BlockInteractiveDatabaseWrites ->
            {ok, [{insufficient_storage, <<"database_dir is too full">>} || _ <- Docs0]};
        true ->
            update_docs_interactive(Db, Docs0, Options)
    end.

update_docs_interactive(Db, Docs0, Options) ->
    Docs = tag_docs(Docs0),

    AllOrNothing = lists:member(all_or_nothing, Options),
    PrepValidateFun = fun(Db0, DocBuckets0, ExistingDocInfos) ->
        prep_and_validate_updates(
            Db0,
            DocBuckets0,
            ExistingDocInfos,
            AllOrNothing,
            [],
            []
        )
    end,

    {ok, DocBuckets, LocalDocs, DocErrors} =
        before_docs_update(Db, Docs, PrepValidateFun, ?INTERACTIVE_EDIT),

    if
        (AllOrNothing) and (DocErrors /= []) ->
            RefErrorDict = dict:from_list([{doc_tag(Doc), Doc} || Doc <- Docs]),
            {aborted,
                lists:map(
                    fun({Ref, Error}) ->
                        #doc{id = Id, revs = {Start, RevIds}} = dict:fetch(Ref, RefErrorDict),
                        case {Start, RevIds} of
                            {Pos, [RevId | _]} -> {{Id, {Pos, RevId}}, Error};
                            {0, []} -> {{Id, {0, <<>>}}, Error}
                        end
                    end,
                    DocErrors
                )};
        true ->
            Options2 =
                if
                    AllOrNothing -> [?REPLICATED_CHANGES];
                    true -> []
                end ++ Options,
            DocBuckets2 = [
                [
                    doc_flush_atts(
                        Db,
                        set_new_att_revpos(
                            check_dup_atts(Doc)
                        )
                    )
                 || Doc <- B
                ]
             || B <- DocBuckets
            ],
            {DocBuckets3, IdRevs} = new_revs(DocBuckets2, [], []),

            {ok, CommitResults} = write_and_commit(
                Db,
                DocBuckets3,
                LocalDocs,
                Options2
            ),

            ResultsDict = lists:foldl(
                fun({Key, Resp}, ResultsAcc) ->
                    dict:store(Key, Resp, ResultsAcc)
                end,
                dict:from_list(IdRevs),
                CommitResults ++ DocErrors
            ),
            {ok,
                lists:map(
                    fun(Doc) ->
                        dict:fetch(doc_tag(Doc), ResultsDict)
                    end,
                    Docs
                )}
    end.

% Returns the first available document on disk. Input list is a full rev path
% for the doc.
make_first_doc_on_disk(_Db, _Id, _Pos, []) ->
    nil;
make_first_doc_on_disk(Db, Id, Pos, [{_Rev, #doc{}} | RestPath]) ->
    make_first_doc_on_disk(Db, Id, Pos - 1, RestPath);
make_first_doc_on_disk(Db, Id, Pos, [{_Rev, ?REV_MISSING} | RestPath]) ->
    make_first_doc_on_disk(Db, Id, Pos - 1, RestPath);
make_first_doc_on_disk(Db, Id, Pos, [{_Rev, #leaf{deleted = IsDel, ptr = Sp}} | _] = DocPath) ->
    Revs = [Rev || {Rev, _} <- DocPath],
    make_doc(Db, Id, IsDel, Sp, {Pos, Revs}).

collect_results_with_metrics(Pid, MRef, []) ->
    Begin = os:timestamp(),
    try
        collect_results(Pid, MRef, [])
    after
        ResultsTime = timer:now_diff(os:timestamp(), Begin) div 1000,
        couch_stats:update_histogram(
            [couchdb, collect_results_time],
            ResultsTime
        )
    end.

collect_results(Pid, MRef, ResultsAcc) ->
    receive
        {result, Pid, Result} ->
            collect_results(Pid, MRef, [Result | ResultsAcc]);
        {done, Pid} ->
            {ok, ResultsAcc};
        {retry, Pid} ->
            retry;
        {'DOWN', MRef, _, _, Reason} ->
            exit(Reason)
    end.

write_and_commit(
    #db{main_pid = Pid, user_ctx = Ctx} = Db,
    DocBuckets1,
    LocalDocs,
    Options
) ->
    DocBuckets = prepare_doc_summaries(Db, DocBuckets1),
    ReplicatedChanges = lists:member(?REPLICATED_CHANGES, Options),
    MRef = erlang:monitor(process, Pid),
    try
        Pid ! {update_docs, self(), DocBuckets, LocalDocs, ReplicatedChanges},
        case collect_results_with_metrics(Pid, MRef, []) of
            {ok, Results} ->
                {ok, Results};
            retry ->
                % This can happen if the db file we wrote to was swapped out by
                % compaction. Retry by reopening the db and writing to the current file
                {ok, Db2} = open(Db#db.name, [{user_ctx, Ctx}]),
                DocBuckets2 = [
                    [doc_flush_atts(Db2, Doc) || Doc <- Bucket]
                 || Bucket <- DocBuckets1
                ],
                % We only retry once
                DocBuckets3 = prepare_doc_summaries(Db2, DocBuckets2),
                close(Db2),
                Pid ! {update_docs, self(), DocBuckets3, LocalDocs, ReplicatedChanges},
                case collect_results_with_metrics(Pid, MRef, []) of
                    {ok, Results} -> {ok, Results};
                    retry -> throw({update_error, compaction_retry})
                end
        end
    after
        erlang:demonitor(MRef, [flush])
    end.

prepare_doc_summaries(Db, BucketList) ->
    [
        lists:map(
            fun(#doc{body = Body, atts = Atts} = Doc0) ->
                DiskAtts = [couch_att:to_disk_term(Att) || Att <- Atts],
                {ok, SizeInfo} = couch_att:size_info(Atts),
                AttsStream =
                    case Atts of
                        [Att | _] ->
                            {stream, StreamEngine} = couch_att:fetch(data, Att),
                            StreamEngine;
                        [] ->
                            nil
                    end,
                Doc1 = Doc0#doc{
                    atts = DiskAtts,
                    meta =
                        [
                            {size_info, SizeInfo},
                            {atts_stream, AttsStream},
                            {ejson_size, couch_ejson_size:encoded_size(Body)}
                        ] ++ Doc0#doc.meta
                },
                couch_db_engine:serialize_doc(Db, Doc1)
            end,
            Bucket
        )
     || Bucket <- BucketList
    ].

before_docs_update(#db{validate_doc_funs = VDFuns} = Db, Docs, PVFun, UpdateType) ->
    increment_stat(Db, [couchdb, database_writes]),

    % Separate _local docs from normal docs
    IsLocal = fun
        (#doc{id = <<?LOCAL_DOC_PREFIX, _/binary>>}) -> true;
        (_) -> false
    end,
    {LocalDocs, Docs2} = lists:partition(IsLocal, Docs),

    BucketList = group_alike_docs(Docs2),

    DocBuckets = lists:map(
        fun(Bucket) ->
            lists:map(
                fun(Doc) ->
                    DocWithBody = couch_doc:with_ejson_body(Doc),
                    couch_db_plugin:before_doc_update(Db, DocWithBody, UpdateType)
                end,
                Bucket
            )
        end,
        BucketList
    ),

    ValidatePred = fun
        (#doc{id = <<?DESIGN_DOC_PREFIX, _/binary>>}) -> true;
        (#doc{atts = Atts}) -> Atts /= []
    end,

    case (VDFuns /= []) orelse lists:any(ValidatePred, Docs2) of
        true ->
            % lookup the doc by id and get the most recent
            Ids = [Id || [#doc{id = Id} | _] <- DocBuckets],
            ExistingDocs = get_full_doc_infos(Db, Ids),
            {DocBuckets2, DocErrors} = PVFun(Db, DocBuckets, ExistingDocs),
            % remove empty buckets
            DocBuckets3 = [Bucket || Bucket <- DocBuckets2, Bucket /= []],
            {ok, DocBuckets3, LocalDocs, DocErrors};
        false ->
            {ok, DocBuckets, LocalDocs, []}
    end.

set_new_att_revpos(#doc{revs = {RevPos, _Revs}, atts = Atts0} = Doc) ->
    Atts = lists:map(
        fun(Att) ->
            case couch_att:fetch(data, Att) of
                % already commited to disk, don't set new rev
                {stream, _} -> Att;
                {Fd, _} when is_pid(Fd) -> Att;
                % write required so update RevPos
                _ -> couch_att:store(revpos, RevPos + 1, Att)
            end
        end,
        Atts0
    ),
    Doc#doc{atts = Atts}.

doc_flush_atts(Db, Doc) ->
    Doc#doc{atts = [couch_att:flush(Db, Att) || Att <- Doc#doc.atts]}.

compressible_att_type(MimeType) when is_binary(MimeType) ->
    compressible_att_type(?b2l(MimeType));
compressible_att_type(MimeType) ->
    TypeExpList = re:split(
        config:get(
            "attachments",
            "compressible_types",
            ?DEFAULT_COMPRESSIBLE_TYPES
        ),
        "\\s*,\\s*",
        [{return, list}]
    ),
    lists:any(
        fun(TypeExp) ->
            Regexp = [
                "^\\s*",
                re:replace(TypeExp, "\\*", ".*"),
                "(?:\\s*;.*?)?\\s*",
                $$
            ],
            re:run(MimeType, Regexp, [caseless]) =/= nomatch
        end,
        [T || T <- TypeExpList, T /= []]
    ).

with_stream(Db, Att, Fun) ->
    [InMd5, Type, Enc] = couch_att:fetch([md5, type, encoding], Att),
    BufferSize = config:get_integer(
        "couchdb",
        "attachment_stream_buffer_size",
        4096
    ),
    Options =
        case (Enc =:= identity) andalso compressible_att_type(Type) of
            true ->
                CompLevel = config:get_integer(
                    "attachments", "compression_level", 8
                ),
                [
                    {buffer_size, BufferSize},
                    {encoding, gzip},
                    {compression_level, CompLevel}
                ];
            _ ->
                [{buffer_size, BufferSize}]
        end,
    {ok, OutputStream} = open_write_stream(Db, Options),
    Fun(OutputStream),
    {StreamEngine, Len, IdentityLen, Md5, IdentityMd5} =
        couch_stream:close(OutputStream),
    couch_util:check_md5(IdentityMd5, InMd5),
    {AttLen, DiskLen, NewEnc} =
        case Enc of
            identity ->
                case {Md5, IdentityMd5} of
                    {Same, Same} ->
                        {Len, IdentityLen, identity};
                    _ ->
                        {Len, IdentityLen, gzip}
                end;
            gzip ->
                case couch_att:fetch([att_len, disk_len], Att) of
                    [AL, DL] when AL =:= undefined orelse DL =:= undefined ->
                        % Compressed attachment uploaded through the standalone API.
                        {Len, Len, gzip};
                    [AL, DL] ->
                        % This case is used for efficient push-replication, where a
                        % compressed attachment is located in the body of multipart
                        % content-type request.
                        {AL, DL, gzip}
                end
        end,
    couch_att:store(
        [
            {data, {stream, StreamEngine}},
            {att_len, AttLen},
            {disk_len, DiskLen},
            {md5, Md5},
            {encoding, NewEnc}
        ],
        Att
    ).

open_write_stream(Db, Options) ->
    couch_db_engine:open_write_stream(Db, Options).

open_read_stream(Db, AttState) ->
    couch_db_engine:open_read_stream(Db, AttState).

is_active_stream(Db, StreamEngine) ->
    couch_db_engine:is_active_stream(Db, StreamEngine).

calculate_start_seq(_Db, _Node, Seq) when is_integer(Seq) ->
    Seq;
calculate_start_seq(Db, Node, {Seq, Uuid}) ->
    % Treat the current node as the epoch node
    calculate_start_seq(Db, Node, {Seq, Uuid, Node});
calculate_start_seq(Db, _Node, {Seq, {split, Uuid}, EpochNode}) ->
    case is_owner(EpochNode, Seq, get_epochs(Db)) of
        true ->
            % Find last replicated sequence from split source to target
            mem3_rep:find_split_target_seq(Db, EpochNode, Uuid, Seq);
        false ->
            couch_log:warning(
                "~p calculate_start_seq not owner "
                "db: ~p, seq: ~p, uuid: ~p, epoch_node: ~p, epochs: ~p",
                [?MODULE, Db#db.name, Seq, Uuid, EpochNode, get_epochs(Db)]
            ),
            0
    end;
calculate_start_seq(Db, Node, {Seq, Uuid, EpochNode}) ->
    case is_prefix(Uuid, get_uuid(Db)) of
        true ->
            case is_owner(EpochNode, Seq, get_epochs(Db)) of
                true ->
                    Seq;
                false ->
                    %% Shard might have been moved from another node. We
                    %% matched the uuid already, try to find last viable
                    %% sequence we can use
                    couch_log:warning(
                        "~p calculate_start_seq not owner, "
                        " trying replacement db: ~p, seq: ~p, uuid: ~p, "
                        "epoch_node: ~p, epochs: ~p",
                        [
                            ?MODULE,
                            Db#db.name,
                            Seq,
                            Uuid,
                            EpochNode,
                            get_epochs(Db)
                        ]
                    ),
                    calculate_start_seq(Db, Node, {replace, EpochNode, Uuid, Seq})
            end;
        false ->
            couch_log:warning(
                "~p calculate_start_seq uuid prefix mismatch "
                "db: ~p, seq: ~p, uuid: ~p, epoch_node: ~p",
                [?MODULE, Db#db.name, Seq, Uuid, EpochNode]
            ),
            %% The file was rebuilt, most likely in a different
            %% order, so rewind.
            0
    end;
calculate_start_seq(Db, _Node, {replace, OriginalNode, Uuid, Seq}) ->
    case is_prefix(Uuid, couch_db:get_uuid(Db)) of
        true ->
            try
                start_seq(get_epochs(Db), OriginalNode, Seq)
            catch
                throw:epoch_mismatch ->
                    couch_log:warning(
                        "~p start_seq duplicate uuid on node: ~p "
                        "db: ~p, seq: ~p, uuid: ~p, epoch_node: ~p",
                        [?MODULE, node(), Db#db.name, Seq, Uuid, OriginalNode]
                    ),
                    0
            end;
        false ->
            {replace, OriginalNode, Uuid, Seq}
    end.

validate_epochs(Epochs) ->
    %% Assert uniqueness.
    case length(Epochs) == length(lists:ukeysort(2, Epochs)) of
        true -> ok;
        false -> erlang:error(duplicate_epoch)
    end,
    %% Assert order.
    case Epochs == lists:sort(fun({_, A}, {_, B}) -> B =< A end, Epochs) of
        true -> ok;
        false -> erlang:error(epoch_order)
    end.

is_prefix(Pattern, Subject) ->
    binary:longest_common_prefix([Pattern, Subject]) == size(Pattern).

is_owner(Node, Seq, Epochs) ->
    Node =:= owner_of(Epochs, Seq).

owner_of(Db, Seq) when not is_list(Db) ->
    owner_of(get_epochs(Db), Seq);
owner_of([], _Seq) ->
    undefined;
owner_of([{EpochNode, EpochSeq} | _Rest], Seq) when Seq >= EpochSeq ->
    EpochNode;
owner_of([_ | Rest], Seq) ->
    owner_of(Rest, Seq).

start_seq([{OrigNode, EpochSeq} | _], OrigNode, Seq) when Seq >= EpochSeq ->
    %% OrigNode is the owner of the Seq so we can safely stream from there
    Seq;
start_seq([{_, NewSeq}, {OrigNode, _} | _], OrigNode, Seq) when Seq >= NewSeq ->
    %% We transferred this file before Seq was written on OrigNode, so we need
    %% to stream from the beginning of the next epoch. Note that it is _not_
    %% necessary for the current node to own the epoch beginning at NewSeq
    NewSeq;
start_seq([_ | Rest], OrigNode, Seq) ->
    start_seq(Rest, OrigNode, Seq);
start_seq([], _OrigNode, _Seq) ->
    throw(epoch_mismatch).

fold_docs(Db, UserFun, UserAcc) ->
    fold_docs(Db, UserFun, UserAcc, []).

fold_docs(Db, UserFun, UserAcc, Options) ->
    couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options).

fold_local_docs(Db, UserFun, UserAcc, Options) ->
    couch_db_engine:fold_local_docs(Db, UserFun, UserAcc, Options).

fold_design_docs(Db, UserFun, UserAcc, Options1) ->
    Options2 = set_design_doc_keys(Options1),
    couch_db_engine:fold_docs(Db, UserFun, UserAcc, Options2).

fold_changes(Db, StartSeq, UserFun, UserAcc) ->
    fold_changes(Db, StartSeq, UserFun, UserAcc, []).

fold_changes(Db, StartSeq, UserFun, UserAcc, Opts) ->
    couch_db_engine:fold_changes(Db, StartSeq, UserFun, UserAcc, Opts).

fold_purge_infos(Db, StartPurgeSeq, Fun, Acc) ->
    fold_purge_infos(Db, StartPurgeSeq, Fun, Acc, []).

fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts) ->
    couch_db_engine:fold_purge_infos(Db, StartPurgeSeq, UFun, UAcc, Opts).

count_changes_since(Db, SinceSeq) ->
    couch_db_engine:count_changes_since(Db, SinceSeq).

%%% Internal function %%%
open_doc_revs_int(Db, IdRevs, Options) ->
    Ids = [Id || {Id, _Revs} <- IdRevs],
    LookupResults = get_full_doc_infos(Db, Ids),
    lists:zipwith(
        fun({Id, Revs}, Lookup) ->
            case Lookup of
                #full_doc_info{rev_tree = RevTree} ->
                    {FoundRevs, MissingRevs} =
                        case Revs of
                            all ->
                                {couch_key_tree:get_all_leafs(RevTree), []};
                            _ ->
                                case lists:member(latest, Options) of
                                    true ->
                                        couch_key_tree:get_key_leafs(RevTree, Revs);
                                    false ->
                                        couch_key_tree:get(RevTree, Revs)
                                end
                        end,
                    FoundResults =
                        lists:map(
                            fun({Value, {Pos, [Rev | _]} = FoundRevPath}) ->
                                case Value of
                                    ?REV_MISSING ->
                                        % we have the rev in our list but know nothing about it
                                        {{not_found, missing}, {Pos, Rev}};
                                    #leaf{deleted = IsDeleted, ptr = SummaryPtr} ->
                                        {ok, make_doc(Db, Id, IsDeleted, SummaryPtr, FoundRevPath)}
                                end
                            end,
                            FoundRevs
                        ),
                    Results =
                        FoundResults ++
                            [{{not_found, missing}, MissingRev} || MissingRev <- MissingRevs],
                    {ok, Results};
                not_found when Revs == all ->
                    {ok, []};
                not_found ->
                    {ok, [{{not_found, missing}, Rev} || Rev <- Revs]}
            end
        end,
        IdRevs,
        LookupResults
    ).

open_doc_int(Db, <<?LOCAL_DOC_PREFIX, _/binary>> = Id, Options) ->
    case couch_db_engine:open_local_docs(Db, [Id]) of
        [#doc{} = Doc] ->
            apply_open_options({ok, Doc}, Options);
        [not_found] ->
            {not_found, missing}
    end;
open_doc_int(Db, #doc_info{id = Id, revs = [RevInfo | _]} = DocInfo, Options) ->
    #rev_info{deleted = IsDeleted, rev = {Pos, RevId}, body_sp = Bp} = RevInfo,
    Doc = make_doc(Db, Id, IsDeleted, Bp, {Pos, [RevId]}),
    apply_open_options(
        {ok, Doc#doc{meta = doc_meta_info(DocInfo, [], Options)}}, Options
    );
open_doc_int(Db, #full_doc_info{id = Id, rev_tree = RevTree} = FullDocInfo, Options) ->
    #doc_info{revs = [#rev_info{deleted = IsDeleted, rev = Rev, body_sp = Bp} | _]} =
        DocInfo = couch_doc:to_doc_info(FullDocInfo),
    {[{_, RevPath}], []} = couch_key_tree:get(RevTree, [Rev]),
    Doc = make_doc(Db, Id, IsDeleted, Bp, RevPath),
    apply_open_options(
        {ok, Doc#doc{meta = doc_meta_info(DocInfo, RevTree, Options)}}, Options
    );
open_doc_int(Db, Id, Options) ->
    case get_full_doc_info(Db, Id) of
        #full_doc_info{} = FullDocInfo ->
            open_doc_int(Db, FullDocInfo, Options);
        not_found ->
            {not_found, missing}
    end.

doc_meta_info(
    #doc_info{high_seq = Seq, revs = [#rev_info{rev = Rev} | RestInfo]}, RevTree, Options
) ->
    case lists:member(revs_info, Options) of
        false ->
            [];
        true ->
            {[{Pos, RevPath}], []} =
                couch_key_tree:get_full_key_paths(RevTree, [Rev]),

            [
                {revs_info, Pos,
                    lists:map(
                        fun
                            ({Rev1, ?REV_MISSING}) ->
                                {Rev1, missing};
                            ({Rev1, Leaf}) ->
                                case Leaf#leaf.deleted of
                                    true ->
                                        {Rev1, deleted};
                                    false ->
                                        {Rev1, available}
                                end
                        end,
                        RevPath
                    )}
            ]
    end ++
        case lists:member(conflicts, Options) of
            false ->
                [];
            true ->
                case [Rev1 || #rev_info{rev = Rev1, deleted = false} <- RestInfo] of
                    [] -> [];
                    ConflictRevs -> [{conflicts, ConflictRevs}]
                end
        end ++
        case lists:member(deleted_conflicts, Options) of
            false ->
                [];
            true ->
                case [Rev1 || #rev_info{rev = Rev1, deleted = true} <- RestInfo] of
                    [] -> [];
                    DelConflictRevs -> [{deleted_conflicts, DelConflictRevs}]
                end
        end ++
        case lists:member(local_seq, Options) of
            false -> [];
            true -> [{local_seq, Seq}]
        end.

make_doc(_Db, Id, Deleted, nil = _Bp, RevisionPath) ->
    #doc{
        id = Id,
        revs = RevisionPath,
        body = [],
        atts = [],
        deleted = Deleted
    };
make_doc(#db{} = Db, Id, Deleted, Bp, {Pos, Revs}) ->
    RevsLimit = get_revs_limit(Db),
    Doc0 = couch_db_engine:read_doc_body(Db, #doc{
        id = Id,
        revs = {Pos, lists:sublist(Revs, 1, RevsLimit)},
        body = Bp,
        deleted = Deleted
    }),
    Doc1 =
        case Doc0#doc.atts of
            BinAtts when is_binary(BinAtts) ->
                Doc0#doc{
                    atts = couch_compress:decompress(BinAtts)
                };
            ListAtts when is_list(ListAtts) ->
                Doc0
        end,
    after_doc_read(Db, Doc1#doc{
        atts = [couch_att:from_disk_term(Db, T) || T <- Doc1#doc.atts]
    }).

after_doc_read(#db{} = Db, Doc) ->
    DocWithBody = couch_doc:with_ejson_body(Doc),
    couch_db_plugin:after_doc_read(Db, DocWithBody).

increment_stat(#db{} = Db, Stat) ->
    increment_stat(Db, Stat, 1).

increment_stat(#db{options = Options}, Stat, Count) when
    is_integer(Count), Count >= 0
->
    case lists:member(sys_db, Options) of
        true ->
            ok;
        false ->
            couch_stats:increment_counter(Stat, Count)
    end.

-spec normalize_dbname(list() | binary()) -> binary().

normalize_dbname(DbName) when is_list(DbName) ->
    normalize_dbname(list_to_binary(DbName));
normalize_dbname(DbName) when is_binary(DbName) ->
    mem3:dbname(couch_util:drop_dot_couch_ext(DbName)).

-spec dbname_suffix(list() | binary()) -> binary().

dbname_suffix(DbName) ->
    filename:basename(normalize_dbname(DbName)).

validate_dbname(DbName) when is_list(DbName) ->
    validate_dbname(?l2b(DbName));
validate_dbname(DbName) when is_binary(DbName) ->
    Normalized = normalize_dbname(DbName),
    couch_db_plugin:validate_dbname(
        DbName, Normalized, fun validate_dbname_int/2
    ).

validate_dbname_int(DbName, Normalized) when is_binary(DbName) ->
    DbNoExt = couch_util:drop_dot_couch_ext(DbName),
    case re:run(DbNoExt, ?DBNAME_REGEX, [{capture, none}, dollar_endonly]) of
        match ->
            ok;
        nomatch ->
            case is_system_db_name(Normalized) of
                true -> ok;
                false -> {error, {illegal_database_name, DbName}}
            end
    end.

is_system_db_name(DbName) when is_list(DbName) ->
    is_system_db_name(?l2b(DbName));
is_system_db_name(DbName) when is_binary(DbName) ->
    Normalized = normalize_dbname(DbName),
    Suffix = filename:basename(Normalized),
    case {filename:dirname(Normalized), lists:member(Suffix, ?SYSTEM_DATABASES)} of
        {<<".">>, Result} ->
            Result;
        {_Prefix, false} ->
            false;
        {Prefix, true} ->
            ReOpts = [{capture, none}, dollar_endonly],
            re:run(Prefix, ?DBNAME_REGEX, ReOpts) == match
    end.

set_design_doc_keys(Options1) ->
    Dir =
        case lists:keyfind(dir, 1, Options1) of
            {dir, D0} -> D0;
            _ -> fwd
        end,
    Options2 = set_design_doc_start_key(Options1, Dir),
    set_design_doc_end_key(Options2, Dir).

-define(FIRST_DDOC_KEY, <<"_design/">>).
-define(LAST_DDOC_KEY, <<"_design0">>).

set_design_doc_start_key(Options, fwd) ->
    Key1 = couch_util:get_value(start_key, Options, ?FIRST_DDOC_KEY),
    Key2 =
        case Key1 < ?FIRST_DDOC_KEY of
            true -> ?FIRST_DDOC_KEY;
            false -> Key1
        end,
    lists:keystore(start_key, 1, Options, {start_key, Key2});
set_design_doc_start_key(Options, rev) ->
    Key1 = couch_util:get_value(start_key, Options, ?LAST_DDOC_KEY),
    Key2 =
        case Key1 > ?LAST_DDOC_KEY of
            true -> ?LAST_DDOC_KEY;
            false -> Key1
        end,
    lists:keystore(start_key, 1, Options, {start_key, Key2}).

set_design_doc_end_key(Options, fwd) ->
    case couch_util:get_value(end_key_gt, Options) of
        undefined ->
            Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY),
            Key2 =
                case Key1 > ?LAST_DDOC_KEY of
                    true -> ?LAST_DDOC_KEY;
                    false -> Key1
                end,
            lists:keystore(end_key, 1, Options, {end_key, Key2});
        EKeyGT ->
            Key2 =
                case EKeyGT > ?LAST_DDOC_KEY of
                    true -> ?LAST_DDOC_KEY;
                    false -> EKeyGT
                end,
            lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2})
    end;
set_design_doc_end_key(Options, rev) ->
    case couch_util:get_value(end_key_gt, Options) of
        undefined ->
            Key1 = couch_util:get_value(end_key, Options, ?LAST_DDOC_KEY),
            Key2 =
                case Key1 < ?FIRST_DDOC_KEY of
                    true -> ?FIRST_DDOC_KEY;
                    false -> Key1
                end,
            lists:keystore(end_key, 1, Options, {end_key, Key2});
        EKeyGT ->
            Key2 =
                case EKeyGT < ?FIRST_DDOC_KEY of
                    true -> ?FIRST_DDOC_KEY;
                    false -> EKeyGT
                end,
            lists:keystore(end_key_gt, 1, Options, {end_key_gt, Key2})
    end.

possible_ancestors(_FullInfo, []) ->
    [];
possible_ancestors(FullInfo, MissingRevs) ->
    #doc_info{revs = RevsInfo} = couch_doc:to_doc_info(FullInfo),
    LeafRevs = [Rev || #rev_info{rev = Rev} <- RevsInfo],
    % Find the revs that are possible ancestors of this rev. A leaf is
    % a possible ancestor if its position is less than any of the
    % missing revs, and if it is less than any, it means it is also
    % less than the maximum missing rev, so we just compare against
    % that.
    {MaxMissingPos, _} = lists:max(MissingRevs),
    lists:filter(fun({Pos, _}) -> Pos < MaxMissingPos end, LeafRevs).

purge_client_warn_lag(Updated, DbName, DocId) when is_integer(Updated) ->
    % Warn about clients that have not updated their purge
    % checkpoints in the last "index_lag_warn_seconds".
    Lag = config:get_integer("purge", "index_lag_warn_seconds", ?PURGE_LAG_SEC),
    Diff = erlang:system_time(second) - Updated,
    case Diff =< Lag of
        true ->
            ok;
        false ->
            Log =
                "~p : Purge checkpoint ~p : ~s not updated in ~p seconds. "
                "Expected update interval is at least ~p seconds",
            couch_log:error(Log, [?MODULE, DbName, DocId, Diff, Lag])
    end;
purge_client_warn_lag(Updated, DbName, DocId) ->
    Log = "~p : Purge checkpoint ~p : ~s has an invalid updated value: ~p",
    couch_log:error(Log, [?MODULE, DbName, DocId, Updated]).

-ifdef(TEST).

-include_lib("couch/include/couch_eunit.hrl").

setup_all() ->
    ok = meck:new(couch_epi, [passthrough]),
    ok = meck:expect(couch_epi, decide, fun(_, _, _, _, _) -> no_decision end),
    ok.

teardown_all(_) ->
    meck:unload().

setup() ->
    meck:reset([couch_epi]).

teardown(_) ->
    ok.

validate_dbname_success_test_() ->
    Cases =
        generate_cases_with_shards("long/co$mplex-/path+/something") ++
            generate_cases_with_shards("something") ++
            lists:append(
                [
                    generate_cases_with_shards(?b2l(SystemDb))
                 || SystemDb <- ?SYSTEM_DATABASES
                ]
            ),
    {
        setup,
        fun setup_all/0,
        fun teardown_all/1,
        {
            foreach,
            fun setup/0,
            fun teardown/1,
            [should_pass_validate_dbname(A) || {_, A} <- Cases]
        }
    }.

validate_dbname_fail_test_() ->
    Cases =
        generate_cases("_long/co$mplex-/path+/_something") ++
            generate_cases("_something") ++
            generate_cases_with_shards("long/co$mplex-/path+/_something#") ++
            generate_cases_with_shards("long/co$mplex-/path+/some.thing") ++
            generate_cases("!abcdefg/werwej/_users") ++
            generate_cases_with_shards("!abcdefg/werwej/_users"),
    {
        setup,
        fun setup_all/0,
        fun teardown_all/1,
        {
            foreach,
            fun setup/0,
            fun teardown/1,
            [should_fail_validate_dbname(A) || {_, A} <- Cases]
        }
    }.

normalize_dbname_test_() ->
    Cases =
        generate_cases_with_shards("long/co$mplex-/path+/_something") ++
            generate_cases_with_shards("_something"),
    WithExpected = [{?l2b(filename:rootname(A)), B} || {A, B} <- Cases],
    [
        {test_name({Expected, Db}), ?_assertEqual(Expected, normalize_dbname(Db))}
     || {Expected, Db} <- WithExpected
    ].

dbname_suffix_test_() ->
    Cases =
        generate_cases_with_shards("long/co$mplex-/path+/_something") ++
            generate_cases_with_shards("_something"),
    WithExpected = [{?l2b(filename:basename(Arg)), Db} || {Arg, Db} <- Cases],
    [
        {test_name({Expected, Db}), ?_assertEqual(Expected, dbname_suffix(Db))}
     || {Expected, Db} <- WithExpected
    ].

is_system_db_name_test_() ->
    Cases = lists:append(
        [
            generate_cases_with_shards("long/co$mplex-/path+/" ++ ?b2l(Db))
         || Db <- ?SYSTEM_DATABASES
        ] ++
            [generate_cases_with_shards(?b2l(Db)) || Db <- ?SYSTEM_DATABASES]
    ),
    WithExpected = [
        {?l2b(filename:basename(filename:rootname(Arg))), Db}
     || {Arg, Db} <- Cases
    ],
    [
        {test_name({Expected, Db}) ++ " in ?SYSTEM_DATABASES", ?_assert(is_system_db_name(Db))}
     || {Expected, Db} <- WithExpected
    ].

should_pass_validate_dbname(DbName) ->
    {test_name(DbName), ?_assertEqual(ok, validate_dbname(DbName))}.

should_fail_validate_dbname(DbName) ->
    {
        test_name(DbName),
        ?_test(begin
            Result = validate_dbname(DbName),
            ?assertMatch({error, {illegal_database_name, _}}, Result),
            {error, {illegal_database_name, FailedDbName}} = Result,
            ?assertEqual(to_binary(DbName), FailedDbName),
            ok
        end)
    }.

calculate_start_seq_test_() ->
    {
        setup,
        fun setup_start_seq_all/0,
        fun teardown_start_seq_all/1,
        {
            foreach,
            fun setup_start_seq/0,
            fun teardown_start_seq/1,
            [
                t_calculate_start_seq_uuid_mismatch(),
                t_calculate_start_seq_is_owner(),
                t_calculate_start_seq_not_owner(),
                t_calculate_start_seq_raw(),
                t_calculate_start_seq_epoch_mismatch(),
                t_calculate_start_seq_shard_move()
            ]
        }
    }.

setup_start_seq_all() ->
    meck:new(couch_db_engine, [passthrough]),
    meck:expect(couch_db_engine, get_uuid, fun(_) -> <<"foo">> end),
    ok = meck:expect(couch_log, warning, 2, ok),
    Epochs = [
        {node2, 10},
        {node1, 1}
    ],
    meck:expect(couch_db_engine, get_epochs, fun(_) -> Epochs end).

teardown_start_seq_all(_) ->
    meck:unload().

setup_start_seq() ->
    meck:reset([
        couch_db_engine,
        couch_log
    ]).

teardown_start_seq(_) ->
    ok.

t_calculate_start_seq_uuid_mismatch() ->
    ?_test(begin
        Db = test_util:fake_db([]),
        Seq = calculate_start_seq(Db, node2, {15, <<"baz">>}),
        ?assertEqual(0, Seq)
    end).

t_calculate_start_seq_is_owner() ->
    ?_test(begin
        Db = test_util:fake_db([]),
        Seq = calculate_start_seq(Db, node2, {15, <<"foo">>}),
        ?assertEqual(15, Seq)
    end).

t_calculate_start_seq_not_owner() ->
    ?_test(begin
        Db = test_util:fake_db([]),
        Seq = calculate_start_seq(Db, node3, {15, <<"foo">>}),
        ?assertEqual(0, Seq)
    end).

t_calculate_start_seq_raw() ->
    ?_test(begin
        Db = test_util:fake_db([]),
        Seq = calculate_start_seq(Db, node1, 13),
        ?assertEqual(13, Seq)
    end).

t_calculate_start_seq_epoch_mismatch() ->
    ?_test(begin
        Db = test_util:fake_db([]),
        SeqIn = {replace, not_this_node, get_uuid(Db), 42},
        Seq = calculate_start_seq(Db, node1, SeqIn),
        ?assertEqual(0, Seq)
    end).

t_calculate_start_seq_shard_move() ->
    ?_test(begin
        Db = test_util:fake_db([]),
        % Sequence when shard was on node1
        ?assertEqual(2, calculate_start_seq(Db, node1, {2, <<"foo">>})),
        % Shard moved to node2 with no other updates after the move to node2
        ?assertEqual(10, calculate_start_seq(Db, node2, {10, <<"foo">>})),
        % Sequence from node1 after the move happened, we reset back to the
        % start of the epoch on node2 = 10
        ?assertEqual(10, calculate_start_seq(Db, node1, {16, <<"foo">>})),
        % Invalid node, epoch mismatch, start at 0
        ?assertEqual(0, calculate_start_seq(Db, node3, {16, <<"foo">>}))
    end).

is_owner_test() ->
    ?assertNot(is_owner(foo, 1, [])),
    ?assertNot(is_owner(foo, 1, [{foo, 2}])),
    ?assert(is_owner(foo, 1, [{foo, 1}])),
    ?assert(is_owner(foo, 2, [{foo, 1}])),
    ?assert(is_owner(foo, 50, [{bar, 100}, {foo, 1}])),
    ?assert(is_owner(foo, 50, [{baz, 200}, {bar, 100}, {foo, 1}])),
    ?assert(is_owner(bar, 150, [{baz, 200}, {bar, 100}, {foo, 1}])),
    ?assert(is_owner(bar, 100, [{baz, 200}, {bar, 100}, {foo, 1}])),
    ?assertNot(is_owner(bar, 99, [{baz, 200}, {bar, 100}, {foo, 1}])),
    ?assertNot(is_owner(baz, 199, [{baz, 200}, {bar, 100}, {foo, 1}])),
    ?assertError(duplicate_epoch, validate_epochs([{foo, 1}, {bar, 1}])),
    ?assertError(epoch_order, validate_epochs([{foo, 100}, {bar, 200}])).

to_binary(DbName) when is_list(DbName) ->
    ?l2b(DbName);
to_binary(DbName) when is_binary(DbName) ->
    DbName.

test_name({Expected, DbName}) ->
    lists:flatten(io_lib:format("~p -> ~p", [DbName, Expected]));
test_name(DbName) ->
    lists:flatten(io_lib:format("~p", [DbName])).

generate_cases_with_shards(DbName) ->
    DbNameWithShard = add_shard(DbName),
    DbNameWithShardAndExtension = add_shard(DbName) ++ ".couch",
    Cases = [
        DbName,
        ?l2b(DbName),
        DbNameWithShard,
        ?l2b(DbNameWithShard),
        DbNameWithShardAndExtension,
        ?l2b(DbNameWithShardAndExtension)
    ],
    [{DbName, Case} || Case <- Cases].

add_shard(DbName) ->
    "shards/00000000-3fffffff/" ++ DbName ++ ".1415960794".

generate_cases(DbName) ->
    [{DbName, DbName}, {DbName, ?l2b(DbName)}].

get_minimum_purge_sequence_test_() ->
    {
        foreach,
        fun setup_purge_seq/0,
        fun teardown_purge_seq/1,
        [
            ?TDEF_FE(t_purge_warn_log),
            ?TDEF_FE(t_purge_client_exists),
            ?TDEF_FE(t_purge_client_doesnt_exist),
            ?TDEF_FE(t_purge_client_check_throws_error)
        ]
    }.

setup_purge_seq() ->
    meck:new(couch_log, [passthrough]),
    meck:new(config, [passthrough]),
    meck:new(couch_db_plugin, [passthrough]),
    meck:expect(config, get, fun(_, _, Default) -> Default end),
    ok.

teardown_purge_seq(_) ->
    meck:unload().

t_purge_warn_log(_) ->
    Now = erlang:system_time(second),

    Updated1 = Now - ?PURGE_LAG_SEC - 100,
    purge_client_warn_lag(Updated1, <<"foo">>, <<"bar">>),
    ?assertEqual(1, meck:num_calls(couch_log, error, 2)),

    meck:reset(couch_log),
    Updated2 = Now - ?PURGE_LAG_SEC + 100,
    purge_client_warn_lag(Updated2, <<"foo">>, <<"bar">>),
    ?assertEqual(0, meck:num_calls(couch_log, error, 2)),

    meck:reset(couch_log),
    purge_client_warn_lag(not_a_number, <<"foo">>, <<"bar">>),
    ?assertEqual(1, meck:num_calls(couch_log, error, 2)).

t_purge_client_exists(_) ->
    Now = erlang:system_time(second),
    meck:expect(couch_db_plugin, is_valid_purge_client, 2, false),

    % If client doesn't exist, don't log
    Props = [{<<"updated_on">>, Now - ?PURGE_LAG_SEC + 100}],
    ?assertNot(purge_client_exists(<<"foo">>, <<"bar">>, Props)),
    ?assertEqual(0, meck:num_calls(couch_log, error, 2)).

t_purge_client_doesnt_exist(_) ->
    Now = erlang:system_time(second),
    meck:expect(couch_db_plugin, is_valid_purge_client, 2, true),

    % Log if stale
    Props1 = [{<<"updated_on">>, Now - ?PURGE_LAG_SEC - 100}],
    ?assert(purge_client_exists(<<"foo">>, <<"bar">>, Props1)),
    ?assertEqual(1, meck:num_calls(couch_log, error, 2)),

    % Not stale, so don't log
    meck:reset(couch_log),
    Props2 = [{<<"updated_on">>, Now - ?PURGE_LAG_SEC + 100}],
    ?assert(purge_client_exists(<<"foo">>, <<"bar">>, Props2)),
    ?assertEqual(0, meck:num_calls(couch_log, error, 2)),

    % Invalid checkpoint doc props, log an error
    meck:reset(couch_log),
    Props3 = [{<<"invalid_props">>, yes}],
    ?assert(purge_client_exists(<<"foo">>, <<"bar">>, Props3)),
    ?assertEqual(1, meck:num_calls(couch_log, error, 2)),

    % Log if invalid checkpoint timestamp
    meck:reset(couch_log),
    Props4 = [{<<"updated_on">>, not_valid}],
    ?assert(purge_client_exists(<<"foo">>, <<"bar">>, Props4)),
    ?assertEqual(1, meck:num_calls(couch_log, error, 2)).

t_purge_client_check_throws_error(_) ->
    Now = erlang:system_time(second),
    meck:expect(couch_db_plugin, is_valid_purge_client, 2, meck:raise(error, bam)),

    % If error is throw when checking validy expect a log but return true as if
    % the puarge client exists.
    Props = [{<<"updated_on">>, Now - ?PURGE_LAG_SEC + 100}],
    ?assert(purge_client_exists(<<"foo">>, <<"bar">>, Props)),
    ?assertEqual(1, meck:num_calls(couch_log, error, 2)).

-endif.
